過去在製造業為了系統間溝通,我們往往是要寫Tx/Rx的程式,而每次要不同的生產數據就需要額外再寫另外一組傳輸程式,也增加了更多的維護成本,因此有需要想一個簡單的機制讓訊息流通更為簡單,剛好IOT議題讓我知道了Redis,他具有發布&訂閱機制非常好用,但因為他的機制有些時候會把訂閱者踢掉,不是很穩定,所以我轉而去看像是RabbitMQ、EMQ X 這種訊息broker,也能啟動MQTT的服務。 在此將以RabbitMQ啟動MQTT服務,然後用paho-mqtt-cpp函式庫來做mqtt client。
文末附上mqtt client測試code。
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo service rabbitmq-server status
執行完應該會看到類似以下的資訊
● rabbitmq-server.service - RabbitMQ broker
Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)
Active: active (running) since Tue 2023-05-02 15:27:34 UTC; 3 days ago
Main PID: 18509 (beam.smp)
Tasks: 220 (limit: 2295)
Memory: 325.1M
CPU: 32min 17.997s
CGroup: /system.slice/rabbitmq-server.service
sudo rabbitmq-plugins enable rabbitmq_mqtt
sudo rabbitmq-plugins list
有安裝的話,會在rabbitmq_mqtt那邊看到E的字樣代表啟動mqtt
Listing plugins with pattern ".*" ...
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@hostname
|/
[e*] amqp_client 5.16.1
[ ] cowboy 1.1.2
[ ] cowlib 1.0.2
[ ] rabbitmq_amqp1_0 3.8.22
[E*] rabbitmq_mqtt 3.8.22
[ ] rabbitmq_stomp 3.8.22
以下安裝我是在使用者的家目錄下安裝,可以用 cd ~ 回到家目錄。 記得一定要先安裝 paho-mqtt-c,再安裝paho-mqtt-cpp才能順利安裝。
sudo apt-get update
sudo apt-get install -y build-essential gcc make cmake libssl-dev git
git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c
mkdir build && cd build
cmake -DPAHO_ENABLE_TESTING=OFF -DPAHO_BUILD_STATIC=ON -DPAHO_WITH_SSL=ON -DPAHO_HIGH_PERFORMANCE=ON ..
make
sudo make install
git clone https://github.com/eclipse/paho.mqtt.cpp.git
cd paho.mqtt.cpp
mkdir build && cd build
cmake -DPAHO_BUILD_STATIC=ON -DPAHO_BUILD_DOCUMENTATION=FALSE -DPAHO_BUILD_SAMPLES=FALSE -DPAHO_WITH_SSL=ON ..
make
sudo make install
sudo ldconfig
g++ -std=c++11 -o mqtt_example mqtt_example.cpp -lpaho-mqttpp3 -lpaho-mqtt3a
其中檔案名稱為mqtt_example.cpp
-o 代表輸出的檔名
-l 代表動態連結
paho-mqttpp3 是c++版本函式庫
paho-mqtt3a 是c版本函式庫,若mqtt有設定帳密,則可用paho-mqtt3as
#include <iostream>
#include <cstring>
#include "mqtt/async_client.h"
const std::string ADDRESS { "tcp://localhost:1883" };
const std::string CLIENT_ID { "paho_cpp_example" };
const std::string TOPIC { "test/topic" };
class callback : public virtual mqtt::callback {
public:
virtual void connection_lost(const std::string& cause) override {
std::cout << "Connection lost: " << cause << std::endl;
}
virtual void delivery_complete(mqtt::delivery_token_ptr tok) override {
std::cout << "Delivery complete" << std::endl;
}
virtual void message_arrived(mqtt::const_message_ptr msg) override {
std::cout << "Message received: " << msg->to_string() << std::endl;
}
};
int main(int argc, char* argv[]) {
mqtt::async_client client(ADDRESS, CLIENT_ID);
callback cb;
client.set_callback(cb);
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
try {
client.connect(connOpts)->wait_for(std::chrono::seconds(5));
}
catch (const mqtt::exception& exc) {
std::cerr << "Error: " << exc.what() << std::endl;
return 1;
}
// Subscribe to the topic
client.subscribe(TOPIC, 1)->wait_for(std::chrono::seconds(5));
// Publish a message to the topic
std::string payload { "Hello, world!" };
mqtt::message_ptr pubmsg = mqtt::make_message(TOPIC, payload);
pubmsg->set_qos(1);
client.publish(pubmsg)->wait_for(std::chrono::seconds(5));
// Wait for messages to arrive
std::this_thread::sleep_for(std::chrono::seconds(5));
client.disconnect()->wait_for(std::chrono::seconds(5));
return 0;
}